#!/usr/bin/env python
#
# pgdb.py
#
# Written by D'Arcy J.M. Cain
#
# $Id: pgdb.py,v 1.33 2006/05/28 19:24:43 cito Exp $
#
"""pgdb - DB-API 2.0 compliant module for PygreSQL.
(c) 1999, Pascal Andre .
See package documentation for further information on copyright.
Inline documentation is sparse.
See DB-API 2.0 specification for usage information:
http://www.python.org/peps/pep-0249.html
Basic usage:
pgdb.connect(connect_string) # open a connection
# connect_string = 'host:database:user:password:opt:tty'
# All parts are optional. You may also pass host through
# password as keyword arguments. To pass a port,
# pass it in the host keyword parameter:
pgdb.connect(host='localhost:5432')
connection.cursor() # open a cursor
cursor.execute(query[, params])
# Execute a query, binding params (a dictionary) if they are
# passed. The binding syntax is the same as the % operator
# for dictionaries, and no quoting is done.
cursor.executemany(query, list of params)
# Execute a query many times, binding each param dictionary
# from the list.
cursor.fetchone() # fetch one row, [value, value, ...]
cursor.fetchall() # fetch all rows, [[value, value, ...], ...]
cursor.fetchmany([size])
# returns size or cursor.arraysize number of rows,
# [[value, value, ...], ...] from result set.
# Default cursor.arraysize is 1.
cursor.description # returns information about the columns
# [(column_name, type_name, display_size,
# internal_size, precision, scale, null_ok), ...]
# Note that precision, scale and null_ok are not implemented.
cursor.rowcount # number of rows available in the result set
# Available after a call to execute.
connection.commit() # commit transaction
connection.rollback() # or rollback transaction
cursor.close() # close the cursor
connection.close() # close the connection
"""
import types
import time
from _pg import *
try: # use mx.DateTime module if available
from mx.DateTime import DateTime, \
TimeDelta, DateTimeType
except ImportError: # otherwise use standard datetime module
from datetime import datetime as DateTime, \
timedelta as TimeDelta, datetime as DateTimeType
### module constants
# compliant with DB SIG 2.0
apilevel = '2.0'
# module may be shared, but not connections
threadsafety = 1
# this module use extended python format codes
paramstyle = 'pyformat'
### internal type handling class
class pgdbTypeCache:
def __init__(self, cnx):
self.__source = cnx.source()
self.__type_cache = {}
def typecast(self, typ, value):
# for NULL values, no typecast is necessary
if value == None:
return value
if typ == STRING:
pass
elif typ == BINARY:
pass
elif typ == BOOL:
value = (value[:1] in ['t','T'])
elif typ == INTEGER:
value = int(value)
elif typ == LONG:
value = long(value)
elif typ == FLOAT:
value = float(value)
elif typ == MONEY:
value = value.replace("$", "")
value = value.replace(",", "")
value = float(value)
elif typ == DATETIME:
# format may differ ... we'll give string
pass
elif typ == ROWID:
value = long(value)
return value
def getdescr(self, oid):
try:
return self.__type_cache[oid]
except:
self.__source.execute(
"SELECT typname, typlen "
"FROM pg_type WHERE oid = %s" % oid
)
res = self.__source.fetch(1)[0]
# column name is omitted from the return value. It will
# have to be prepended by the caller.
res = (
res[0],
None, int(res[1]),
None, None, None
)
self.__type_cache[oid] = res
return res
### cursor object
class pgdbCursor:
def __init__(self, src, cache):
self.__cache = cache
self.__source = src
self.description = None
self.rowcount = -1
self.arraysize = 1
self.lastrowid = None
def close(self):
self.__source.close()
self.description = None
self.rowcount = -1
self.lastrowid = None
def arraysize(self, size):
self.arraysize = size
def execute(self, operation, params = None):
# "The parameters may also be specified as list of
# tuples to e.g. insert multiple rows in a single
# operation, but this kind of usage is deprecated:
if params and isinstance(params, types.ListType) and \
isinstance(params[0], types.TupleType):
self.executemany(operation, params)
else:
# not a list of tuples
self.executemany(operation, (params,))
def executemany(self, operation, param_seq):
self.description = None
self.rowcount = -1
# first try to execute all queries
totrows = 0
sql = "INIT"
try:
for params in param_seq:
if params != None:
sql = _quoteparams(operation, params)
else:
sql = operation
rows = self.__source.execute(sql)
if rows != None: # true if __source is not DML
totrows += rows
else:
self.rowcount = -1
except Error, msg:
raise DatabaseError, "error '%s' in '%s'" % ( msg, sql )
except Exception, err:
raise OperationalError, "internal error in '%s': %s" % (sql,err)
except:
raise OperationalError, "internal error in '%s'" % sql
# then initialize result raw count and description
if self.__source.resulttype == RESULT_DQL:
self.rowcount = self.__source.ntuples
d = []
for typ in self.__source.listinfo():
# listinfo is a sequence of
# (index, column_name, type_oid)
# getdescr returns all items needed for a
# description tuple except the column_name.
desc = typ[1:2]+self.__cache.getdescr(typ[2])
d.append(desc)
self.description = d
self.lastrowid = self.__source.oidstatus()
else:
self.rowcount = totrows
self.description = None
self.lastrowid = self.__source.oidstatus()
def fetchone(self):
res = self.fetchmany(1, 0)
try:
return res[0]
except:
return None
def fetchall(self):
return self.fetchmany(-1, 0)
def fetchmany(self, size = None, keep = 0):
if size == None:
size = self.arraysize
if keep == 1:
self.arraysize = size
try: res = self.__source.fetch(size)
except Error, e: raise DatabaseError, str(e)
result = []
for r in res:
row = []
for i in range(len(r)):
row.append(self.__cache.typecast(
self.description[i][1],
r[i]
)
)
result.append(row)
return result
def nextset(self):
raise NotSupportedError, "nextset() is not supported"
def setinputsizes(self, sizes):
pass
def setoutputsize(self, size, col = 0):
pass
class _quoteitem(dict):
def __getitem__(self, key):
return _quote(super(_quoteitem, self).__getitem__(key))
def _quote(x):
if isinstance(x, DateTimeType):
x = str(x)
elif isinstance(x, unicode):
x = x.encode( 'utf-8' )
if isinstance(x, types.StringType):
x = "'%s'" % str(x).replace("\\", "\\\\").replace("'", "''")
elif isinstance(x, (types.IntType, types.LongType, types.FloatType)):
pass
elif x is None:
x = 'NULL'
elif isinstance(x, (types.ListType, types.TupleType)):
x = '(%s)' % ','.join(map(lambda x: str(_quote(x)), x))
elif hasattr(x, '__pg_repr__'):
x = x.__pg_repr__()
else:
raise InterfaceError, 'do not know how to handle type %s' % type(x)
return x
def _quoteparams(s, params):
if hasattr(params, 'has_key'):
params = _quoteitem(params)
else:
params = tuple(map(_quote, params))
return s % params
### connection object
class pgdbCnx:
def __init__(self, cnx):
self.__cnx = cnx
self.__cache = pgdbTypeCache(cnx)
try:
src = self.__cnx.source()
src.execute("BEGIN")
except:
raise OperationalError, "invalid connection."
def close(self):
self.__cnx.close()
def commit(self):
try:
src = self.__cnx.source()
src.execute("COMMIT")
src.execute("BEGIN")
except:
raise OperationalError, "can't commit."
def rollback(self):
try:
src = self.__cnx.source()
src.execute("ROLLBACK")
src.execute("BEGIN")
except:
raise OperationalError, "can't rollback."
def cursor(self):
try:
src = self.__cnx.source()
return pgdbCursor(src, self.__cache)
except:
raise OperationalError, "invalid connection."
### module interface
# connects to a database
_connect_ = connect
def connect(dsn = None,
user = None, password = None,
host = None, database = None):
# first get params from DSN
dbport = -1
dbhost = ""
dbbase = ""
dbuser = ""
dbpasswd = ""
dbopt = ""
dbtty = ""
try:
params = dsn.split(":")
dbhost = params[0]
dbbase = params[1]
dbuser = params[2]
dbpasswd = params[3]
dbopt = params[4]
dbtty = params[5]
except:
pass
# override if necessary
if user != None:
dbuser = user
if password != None:
dbpasswd = password
if database != None:
dbbase = database
if host != None:
try:
params = host.split(":")
dbhost = params[0]
dbport = int(params[1])
except:
pass
# empty host is localhost
if dbhost == "":
dbhost = None
if dbuser == "":
dbuser = None
# open the connection
cnx = _connect_(dbbase, dbhost, dbport, dbopt,
dbtty, dbuser, dbpasswd)
return pgdbCnx(cnx)
### types handling
# PostgreSQL is object-oriented: types are dynamic.
# We must thus use type names as internal type codes.
class pgdbType:
def __init__(self, *values):
self.values= values
def __cmp__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
STRING = pgdbType(
'char', 'bpchar', 'name', 'text', 'varchar'
)
# BLOB support is pg specific
BINARY = pgdbType()
INTEGER = pgdbType('int2', 'int4', 'serial')
NUMBER = pgdbType('int2', 'int4', 'serial', 'int8', 'float4', 'float8', 'numeric')
LONG = pgdbType('int8')
FLOAT = pgdbType('float4', 'float8', 'numeric')
BOOL = pgdbType('bool')
MONEY = pgdbType('money')
# this may be problematic as type are quite different ... I hope it won't hurt
DATETIME = pgdbType(
'abstime', 'reltime', 'tinterval', 'date', 'time', 'timespan', 'timestamp', 'timestamptz', 'interval'
)
# OIDs are used for everything (types, tables, BLOBs, rows, ...). This may cause
# confusion, but we are unable to find out what exactly is behind the OID (at
# least not easily enough). Should this be undefined as BLOBs ?
ROWID = pgdbType(
'oid', 'oid8'
)
# mandatory type helpers
def Date(year, month, day):
return DateTime(year, month, day)
def Time(hour, minute, second):
return TimeDelta(hour, minute, second)
def Timestamp(year, month, day, hour, minute, second):
return DateTime(year, month, day, hour, minute, second)
def DateFromTicks(ticks):
return apply(Date, time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return apply(Time, time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return apply(Timestamp, time.localtime(ticks)[:6])
def Binary(str):
return str
# if run as script, print some information
if __name__ == '__main__':
print 'PyGreSQL version', version
print
print __doc__