Coverage for pydblite/sqlite : 71%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# # BSD licence # # Author : Pierre Quentel (pierre.quentel@gmail.com) #
Main differences from :mod:`pydblite.pydblite`:
- pass the connection to the :class:`SQLite db <pydblite.sqlite.Database>` as argument to :class:`Table <pydblite.sqlite.Table>` - in :func:`create() <pydblite.sqlite.Table.create>` field definitions must specify a type. - no `drop_field` (not supported by SQLite) - the :class:`Table <pydblite.sqlite.Table>` instance has a :attr:`cursor <pydblite.sqlite.Database.cursor>` attribute, so that raw SQL requests can be executed. """
except ImportError: import io unicode = str # used in tests
def to_str(val): # leaves a Unicode unchanged return val
# test if sqlite is installed or raise exception except ImportError: try: from pysqlite2 import dbapi2 as sqlite from pysqlite2._sqlite import OperationalError except ImportError: print("SQLite is not installed") raise
# compatibility with Python 2.3 except NameError: from sets import Set as set # NOQA
# classes for CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP return datetime.date.today().strftime('%Y-%M-%D')
return datetime.datetime.now().strftime('%h:%m:%s')
return datetime.datetime.now().strftime('%Y-%M-%D %h:%m:%s')
# functions to convert a value returned by a SQLite SELECT
# CURRENT_TIME format is HH:MM:SS # CURRENT_DATE : YYYY-MM-DD # CURRENT_TIMESTAMP : YYYY-MM-DD HH:MM:SS
# DATE : convert YYYY-MM-DD to datetime.date instance return None raise ValueError("Bad value %s for DATE format" % date)
# TIME : convert HH-MM-SS to datetime.time instance if _time is None: return None mo = c_time_fmt.match(_time) if not mo: raise ValueError("Bad value %s for TIME format" % _time) hour, minute, second = [int(x) for x in mo.groups()] return datetime.time(hour, minute, second)
# DATETIME or TIMESTAMP : convert %YYYY-MM-DD HH:MM:SS # to datetime.datetime instance if timestamp is None: return None if not isinstance(timestamp, unicode): raise ValueError("Bad value %s for TIMESTAMP format" % timestamp) mo = c_tmsp_fmt.match(timestamp) if not mo: raise ValueError("Bad value %s for TIMESTAMP format" % timestamp) return datetime.datetime(*[int(x) for x in mo.groups()])
# if default value is CURRENT_DATE etc. SQLite doesn't # give the information, default is the value of the # variable as a string. We have to guess... # h, m, s = [int(x) for x in mo.groups()] if (0 <= h <= 23) and (0 <= m <= 59) and (0 <= s <= 59): return CurrentTime y, m, d = [int(x) for x in mo.groups()] try: datetime.date(y, m, d) return CurrentDate except: pass y, mth, d, h, mn, s = [int(x) for x in mo.groups()] try: datetime.datetime(y, mth, d, h, mn, s) return CurrentTimestamp except: pass
"""SQLiteError"""
""" To create an in-memory database provide ':memory:' as filename
Args: - filename (str): The name of the database file, or ':memory:' - kw (dict): Arguments forwarded to sqlite3.connect """ """The SQLite connection""" """The SQLite connections cursor"""
"""Return the list of table names in the database"""
"""Save any changes to the database"""
# drop table if isinstance(table, Table): table = table.name self.cursor.execute('DROP TABLE %s' % table) dict.__delitem__(self, table)
""" Args:
- table_name (str): The name of the SQLite table. - db (:class:`Database <pydblite.sqlite.Database>`): The database.
""" """The SQLite connections cursor"""
""" Create a new table.
Args: - fields (list of tuples): The fields names/types to create. For each field, a 2-element tuple must be provided:
- the field name - a string with additional information like field type + other information using the SQLite syntax eg ('name', 'TEXT NOT NULL'), ('date', 'BLOB DEFAULT CURRENT_DATE')
- mode (str): The mode used when creating the database. mode is only used if a database file already exists.
- if mode = 'open' : open the existing base, ignore the fields - if mode = 'override' : erase the existing base and create a new one with the specified fields
Returns: - the database (self). """
else:
"""Open an existing database."""
"""Save any changes to the database"""
"""Inspect the base to get field names.""" # can be null ? # default value
# returns information about the table return [(field, self.field_info[field]) for field in self.fields]
msg = "Error in field definition %s" % field msg += ": should be a tuple with field_name, field_info, and optionally a default value" raise SQLiteError(msg)
"""When a record is returned by a SELECT, ask conversion of specified field value with the specified function.""" raise NameError("Unknown field %s" % field_name)
"""Ask conversion of field to an instance of datetime.date"""
"""Ask conversion of field to an instance of datetime.date""" self.conv(field_name, to_time)
"""Ask conversion of field to an instance of datetime.date""" self.conv(field_name, to_datetime)
"""Insert a record in the database.
Parameters can be positional or keyword arguments. If positional they must be in the same order as in the :func:`create` method.
Returns: - The record identifier """
"""Insert a list or tuple of records
Returns: - The last row id """ else: ','.join(['?' for f in self.fields])) except: raise Exception(self._err_msg(sql, args)) # return last row id
"""Remove a single record, or the records in an iterable.
Before starting deletion, test if all records are in the base and don't have twice the same __id__.
Returns: - int: the number of deleted items """ # remove a single record else: # convert iterable into a list removed = [r for r in removed] if not removed: return 0 args = [r['__id__'] for r in removed] sql += "WHERE rowid IN (%s)" % (','.join(['?'] * len(args)))
"""Update the record with new keys and values.""" ",".join(vals))
"""Make a list of strings to pass to an SQL statement from the dictionary kw with Python types."""
"""Make a record dictionary from the result of a fetch_"""
"""Add a new column to the table.
Args: - name (string): The name of the field - column_type (string): The data type of the column (Defaults to TEXT) - default (datatype): The default value for this field (if any)
"""
raise SQLiteError("Dropping fields is not supported by SQLite")
""" Selection by field values.
db(key=value) returns the list of records where r[key] = value
Args: - args (list): A field to filter on. - kw (dict): pairs of field and value to filter on.
Returns: - When args supplied, return a :class:`Filter <pydblite.common.Filter>` object that filters on the specified field. - When kw supplied, return all the records where field values matches the key/values in kw.
""" raise SyntaxError("Can't specify positional AND keyword arguments")
raise SyntaxError("Only one field can be specified") use_expression = True raise ValueError("%s is not a field" % args[0]) else:
sql = "SELECT rowid,* FROM %s WHERE %s" % (self.name, args[0]) self.cursor.execute(sql) return [self._make_record(row) for row in self.cursor.fetchall()] else: raise ValueError("Fields %s not in the database" % undef) else:
"""Direct access by record id.""" else:
else:
"""Delete by record id"""
"""Iteration on the records"""
msg = "Exception for table %s.%s\n" % (self.db, self.name) msg += 'SQL request %s\n' % sql if args: import pprint msg += 'Arguments : %s\n' % pprint.saferepr(args) out = io.StringIO() traceback.print_exc(file=out) msg += out.getvalue() return msg
sql = "SELECT %s, COUNT(*) FROM %s GROUP BY %s WHERE %s" % (group_by, self.name, group_by, db_filter) else:
sql += " WHERE %s" % db_filter
except OperationalError: return indices
|