|
/** A set backed by a database table */ |
|
public class PersistentDatafileSetStraightforward extends AbstractSet<Datafile> { |
|
|
|
public enum StorageType {Hive, JDBC} |
|
|
|
public static final String JDBC_URL = "jdbc.url"; |
|
public static final String JDBC_USERNAME = "jdbc.username"; |
|
public static final String JDBC_PASSWORD = "jdbc.password"; |
|
|
|
private final Configuration conf; |
|
private final String tableName; |
|
private final StorageType storageType; |
|
|
|
public PersistentDatafileSetStraightforward(Configuration conf, |
|
String tableName, |
|
StorageType storageType) { |
|
this.tableName = tableName; |
|
this.storageType = storageType; |
|
this.conf = conf; |
|
} |
|
|
|
@Override |
|
public boolean add(Datafile datafile) { |
|
String filename = datafile.getFilename(); |
|
long published = datafile.getTimestamp(); |
|
|
|
try (Connection connection = createConnection(); |
|
Statement statement = connection.createStatement()) { |
|
|
|
connection.setAutoCommit(false); |
|
|
|
if (countRows(statement, filename, published) > 0) { |
|
return false; |
|
} |
|
|
|
String insertStatement = makeInsertStatement(filename, published); |
|
statement.execute(insertStatement); |
|
connection.commit(); |
|
return true; |
|
} catch (SQLException | IOException e) { |
|
throw new RuntimeException(e); |
|
} |
|
} |
|
|
|
@Override |
|
public boolean contains(Object o) { |
|
Datafile datafile = (Datafile) o; |
|
|
|
String filename = datafile.getFilename(); |
|
long timestamp = datafile.getTimestamp(); |
|
|
|
try (Statement statement = createConnection().createStatement()) { |
|
return countRows(statement, filename, timestamp) > 0; |
|
} catch (SQLException|IOException e) { |
|
throw new RuntimeException("Error reading database", e); |
|
} |
|
} |
|
|
|
@Override |
|
public int size() { |
|
try (Statement statement = createConnection().createStatement()) { |
|
return countRows(statement); |
|
} catch (SQLException|IOException e) { |
|
throw new RuntimeException("Error reading database", e); |
|
} |
|
} |
|
|
|
@Override |
|
public Iterator<Datafile> iterator() { |
|
try (Statement statement = createConnection().createStatement()) { |
|
|
|
String getAllStatement = makeGetAllStatement(); |
|
ResultSet results = statement.executeQuery(getAllStatement); |
|
List<Datafile> resultList = new ArrayList<>(); |
|
|
|
while (results.next()) { |
|
resultList.add(new Datafile(results.getString(1), results.getLong(2))); |
|
} |
|
return resultList.iterator(); |
|
|
|
} catch (SQLException|IOException e) { |
|
throw new RuntimeException("Error reading database", e); |
|
} |
|
} |
|
|
|
// ———————————————————————- private |
|
|
|
private Connection createConnection() throws IOException { |
|
try { |
|
|
|
return DriverManager.getConnection(conf.get(JDBC_URL), |
|
conf.get(JDBC_USERNAME, ""), |
|
conf.get(JDBC_PASSWORD, "")); |
|
} catch (Exception e) { |
|
throw new IOException("Exception creating connection to " |
|
+ conf.get(JDBC_URL), e); |
|
} |
|
} |
|
|
|
// Counts the number of rows for a set. |
|
String makeCountStatement() { |
|
return "select count(*) from datafiles where " |
|
+ "table_name = '" + tableName + "' " |
|
+ "and storage_type = '" + storageType + "';"; |
|
} |
|
|
|
// Counts the number of rows for a set and a given datafile. |
|
String makeCountStatement(String filename, long timestamp) { |
|
return |
|
"select count(*) from datafiles where " |
|
+ "and filename = '" + filename + "' " |
|
+ "and timestamp = '" + timestamp + "' " |
|
+ "and table_name = '" + tableName + "' " |
|
+ "and storage_type = '" + storageType + "';"; |
|
} |
|
|
|
private int countRows(Statement statement) throws SQLException { |
|
String countStatement = makeCountStatement(); |
|
ResultSet previousValue = statement.executeQuery(countStatement); |
|
previousValue.next(); |
|
|
|
return previousValue.getInt(1); |
|
} |
|
|
|
private int countRows(Statement statement, |
|
String filename, |
|
long timestamp) throws SQLException { |
|
|
|
String countStatement = makeCountStatement(filename, timestamp); |
|
ResultSet previousValue = statement.executeQuery(countStatement); |
|
previousValue.next(); |
|
|
|
return previousValue.getInt(1); |
|
} |
|
|
|
String makeInsertStatement(String filename, long timestamp) { |
|
return |
|
"insert into datafiles " |
|
+ "(filename, timestamp, table_name, storage_type) " |
|
+ "values (" |
|
+ "'" + filename + "', " |
|
+ timestamp + ", " |
|
+ "'" + tableName + "', " |
|
+ "'" + storageType + "');"; |
|
} |
|
|
|
// Get all rows for a given set. |
|
String makeGetAllStatement() { |
|
return "select filename, timestamp from datafiles where " |
|
+ "table_name = '" + tableName + "' " |
|
+ "and storage_type = '" + storageType + "';"; |
|
|
|
} |
|
} |