]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
(no commit message)
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 29 Oct 2005 19:15:17 +0000 (19:15 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 29 Oct 2005 19:15:17 +0000 (19:15 +0000)
lib/sqlalchemy/databases/postgres.py
lib/sqlalchemy/sql.py
test/engines.py

index 000f403f1a19640170d6ca38cdf72e8db2799fea..f2057b90986599f8c943ad4b232b49d6334df9e1 100644 (file)
@@ -51,7 +51,7 @@ class PGChar(sqltypes.CHAR):
         return "CHAR(%(length)s)" % {'length' : self.length}
 class PGBinary(sqltypes.Binary):
     def get_col_spec(self):
-        return "BLOB"
+        return "BYTEA"
 class PGBoolean(sqltypes.Boolean):
     def get_col_spec(self):
         return "BOOLEAN"
@@ -168,7 +168,80 @@ class PGSQLEngine(ansisql.ANSISQLEngine):
         return self.module
 
     def reflecttable(self, table):
-        raise NotImplementedError()
+        
+        columns = schema.Table("columns", table.engine,
+            Column("table_schema", String),
+            Column("table_name", String),
+            Column("column_name", String),
+            Column("is_nullable", Integer),
+            Column("data_type", String),
+            Column("ordinal_position", Integer),
+            schema="information_schema")
+            
+        constraints = schema.Table("table_constraints", table.engine,
+            Column("table_schema", String),
+            Column("table_name", String),
+            Column("constraint_name", String),
+            Column("constraint_type", String),
+            schema="information_schema")
+
+        column_constraints = schema.Table("constraint_column_usage", table.engine,
+            Column("table_schema", String),
+            Column("table_name", String),
+            Column("column_name", String),
+            Column("constraint_name", String),
+            schema="information_schema")
+            
+        s = columns.select(columns.c.table_name==table.name,
+            from_obj=[sql.join(columns, column_constraints, 
+                sql.and_(
+                        columns.c.table_name==column_constraints.c.table_name,
+                        columns.c.table_schema==column_constraints.c.table_schema,
+                        columns.c.column_name==column_constraints.c.column_name,
+                    ), 
+                isouter=True).join(constraints, 
+                    sql.and_(
+                        column_constraints.c.table_schema==constraints.c.table_schema,
+                        column_constraints.c.constraint_name==constraints.c.constraint_name,
+                        constraints.c.constraint_type=='PRIMARY KEY'
+                    ), isouter=True)],
+            order_by=[columns.c.ordinal_position])
+        s.append_column(constraints.c.constraint_type)    
+        if table.schema is not None:
+            s.append_whereclause(columns.c.table_schema==table.schema)
+        else:
+            current_schema = text("select current_schema()", table.engine).execute().fetchone()[0]
+            s.append_whereclause(columns.c.table_schema==current_schema)
+
+        c = s.execute()
+        while True:
+            row = c.fetchone()
+            if row is None:
+                break
+            print "row! " + repr(row)
+            continue
+            (name, type, nullable, primary_key) = (row[1], row[2].upper(), not row[3], row[5])
+
+            match = re.match(r'(\w+)(\(.*?\))?', type)
+            coltype = match.group(1)
+            args = match.group(2)
+
+            #print "coltype: " + repr(coltype) + " args: " + repr(args)
+            coltype = pragma_names[coltype]
+            if args is not None:
+                args = re.findall(r'(\d+)', args)
+                #print "args! " +repr(args)
+                coltype = coltype(*args)
+            table.append_item(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable))
+        c = self.execute("PRAGMA foreign_key_list(" + table.name + ")", {})
+        while True:
+            row = c.fetchone()
+            if row is None:
+                break
+            (tablename, localcol, remotecol) = (row[2], row[3], row[4])
+            #print "row! " + repr(row)
+            remotetable = Table(tablename, self, autoload = True)
+            table.c[localcol].foreign_key = schema.ForeignKey(remotetable.c[remotecol])
 
 class PGCompiler(ansisql.ANSICompiler):
     def bindparam_string(self, name):
index 700ae64fce3266cd53d92a40c7ed9d95285d04dd..039e3cd768503c0f8b4a82a361c4409811b07a0d 100644 (file)
@@ -750,7 +750,12 @@ class Select(Selectable):
         for f in self.whereclause._get_from_objects():
             self.froms.setdefault(f.id, f)
 
-   
+    def append_whereclause(self, clause):
+        if self.whereclause is not None:
+            self.whereclause = and_(self.whereclause, clause)
+        else:
+            self.whereclause = clause
+        
     def clear_from(self, id):
         self.append_from(FromClause(from_name = None, from_key = id))
         
index 7edb700bbbf45bed184be631d82e33c02db9d9bc..2d7651050af8b2241251c9de9e76b75083f0a458 100644 (file)
@@ -16,7 +16,8 @@ import unittest, re
 class EngineTest(PersistTest):
     def testsqlitetableops(self):
         import sqlalchemy.databases.sqlite as sqllite
-        db = sqllite.engine(':memory:', {}, echo = testbase.echo)
+#        db = sqllite.engine(':memory:', {}, echo = testbase.echo)
+        db = postgres.engine({'database':'test', 'host':'127.0.0.1', 'user':'scott', 'password':'tiger'}, echo = testbase.echo)
         self.do_tableops(db)
         
     def do_tableops(self, db):
@@ -41,6 +42,9 @@ class EngineTest(PersistTest):
             Column('remote_user_id', Integer, foreign_key = ForeignKey(users.c.user_id)),
             Column('email_address', String(20)),
         )
+
+        users.drop()
+        addresses.drop()
         
 #        users.c.parent_user_id.set_foreign_key(ForeignKey(users.c.user_id))
 
@@ -50,7 +54,7 @@ class EngineTest(PersistTest):
         # clear out table registry
         db.tables.clear()
 
-        users = Table('users', db, autoload = True)
+#        users = Table('users', db, autoload = True)
         addresses = Table('email_addresses', db, autoload = True)
 
         users.drop()