def get_col_spec(self):
return "BIGINT"
-class PG2DateTime(sqltypes.DateTime):
+class PGDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "TIMESTAMP " + (self.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
-class PG1DateTime(sqltypes.DateTime):
- def convert_bind_param(self, value, dialect):
- if value is not None:
- if isinstance(value, datetime.datetime):
- seconds = float(str(value.second) + "."
- + str(value.microsecond))
- mx_datetime = mxDateTime(value.year, value.month, value.day,
- value.hour, value.minute,
- seconds)
- return dialect.dbapi.TimestampFromMx(mx_datetime)
- return dialect.dbapi.TimestampFromMx(value)
- else:
- return None
-
- def convert_result_value(self, value, dialect):
- if value is None:
- return None
- second_parts = str(value.second).split(".")
- seconds = int(second_parts[0])
- microseconds = int(second_parts[1])
- return datetime.datetime(value.year, value.month, value.day,
- value.hour, value.minute, seconds,
- microseconds)
-
- def get_col_spec(self):
- return "TIMESTAMP " + (self.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
-
-class PG2Date(sqltypes.Date):
- def get_col_spec(self):
- return "DATE"
-
-class PG1Date(sqltypes.Date):
- def convert_bind_param(self, value, dialect):
- # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
- # this one doesnt seem to work with the "emulation" mode
- if value is not None:
- return dialect.dbapi.DateFromMx(value)
- else:
- return None
-
- def convert_result_value(self, value, dialect):
- # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
- return value
-
+class PGDate(sqltypes.Date):
def get_col_spec(self):
return "DATE"
-class PG2Time(sqltypes.Time):
- def get_col_spec(self):
- return "TIME " + (self.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
-
-class PG1Time(sqltypes.Time):
- def convert_bind_param(self, value, dialect):
- # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
- # this one doesnt seem to work with the "emulation" mode
- if value is not None:
- return psycopg.TimeFromMx(value)
- else:
- return None
-
- def convert_result_value(self, value, dialect):
- # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
- return value
-
+class PGTime(sqltypes.Time):
def get_col_spec(self):
return "TIME " + (self.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
def get_col_spec(self):
return "BOOLEAN"
-pg2_colspecs = {
+colspecs = {
sqltypes.Integer : PGInteger,
sqltypes.Smallinteger : PGSmallInteger,
sqltypes.Numeric : PGNumeric,
sqltypes.Float : PGFloat,
- sqltypes.DateTime : PG2DateTime,
- sqltypes.Date : PG2Date,
- sqltypes.Time : PG2Time,
+ sqltypes.DateTime : PGDateTime,
+ sqltypes.Date : PGDate,
+ sqltypes.Time : PGTime,
sqltypes.String : PGString,
sqltypes.Binary : PGBinary,
sqltypes.Boolean : PGBoolean,
sqltypes.TEXT : PGText,
sqltypes.CHAR: PGChar,
}
-pg1_colspecs = pg2_colspecs.copy()
-pg1_colspecs.update({
- sqltypes.DateTime : PG1DateTime,
- sqltypes.Date : PG1Date,
- sqltypes.Time : PG1Time
- })
-
-pg2_ischema_names = {
+
+ischema_names = {
'integer' : PGInteger,
'bigint' : PGBigInteger,
'smallint' : PGSmallInteger,
'real' : PGFloat,
'inet': PGInet,
'double precision' : PGFloat,
- 'timestamp' : PG2DateTime,
- 'timestamp with time zone' : PG2DateTime,
- 'timestamp without time zone' : PG2DateTime,
- 'time with time zone' : PG2Time,
- 'time without time zone' : PG2Time,
- 'date' : PG2Date,
- 'time': PG2Time,
+ 'timestamp' : PGDateTime,
+ 'timestamp with time zone' : PGDateTime,
+ 'timestamp without time zone' : PGDateTime,
+ 'time with time zone' : PGTime,
+ 'time without time zone' : PGTime,
+ 'date' : PGDate,
+ 'time': PGTime,
'bytea' : PGBinary,
'boolean' : PGBoolean,
'interval':PGInterval,
}
-pg1_ischema_names = pg2_ischema_names.copy()
-pg1_ischema_names.update({
- 'timestamp with time zone' : PG1DateTime,
- 'timestamp without time zone' : PG1DateTime,
- 'date' : PG1Date,
- 'time' : PG1Time
- })
def descriptor():
return {'name':'postgres',
ansisql.ANSIDialect.__init__(self, default_paramstyle='pyformat', **kwargs)
self.use_oids = use_oids
self.server_side_cursors = server_side_cursors
- if self.dbapi is None or not hasattr(self.dbapi, '__version__') or self.dbapi.__version__.startswith('2'):
- self.version = 2
- else:
- self.version = 1
self.use_information_schema = use_information_schema
self.paramstyle = 'pyformat'
def dbapi(cls):
- try:
- import psycopg2 as psycopg
- except ImportError, e:
- try:
- import psycopg
- except ImportError, e2:
- raise e
+ import psycopg2 as psycopg
return psycopg
dbapi = classmethod(dbapi)
def create_connect_args(self, url):
opts = url.translate_connect_args(['host', 'database', 'user', 'password', 'port'])
if opts.has_key('port'):
- if self.version == 2:
- opts['port'] = int(opts['port'])
- else:
- opts['port'] = str(opts['port'])
+ opts['port'] = int(opts['port'])
opts.update(url.query)
return ([], opts)
return 68
def type_descriptor(self, typeobj):
- if self.version == 2:
- return sqltypes.adapt_type(typeobj, pg2_colspecs)
- else:
- return sqltypes.adapt_type(typeobj, pg1_colspecs)
+ return sqltypes.adapt_type(typeobj, colspecs)
def compiler(self, statement, bindparams, **kwargs):
return PGCompiler(self, statement, bindparams, **kwargs)
return False
def reflecttable(self, connection, table):
- if self.version == 2:
- ischema_names = pg2_ischema_names
- else:
- ischema_names = pg1_ischema_names
-
if self.use_information_schema:
ischema.reflecttable(connection, table, ischema_names)
else: