Data Storage¶
Confluo operates on data streams. Each stream comprises of records, each of which follows a pre-defined schema over a collection of strongly-typed attributes.
Attributes¶
Confluo currently supports only bounded-width attributes1. An attribute is said to have bounded-width if all records in a single stream use some maximum number of bits to represent that attribute; this includes primitive data types such as binary, integral or floating-point values, or domain-specific types such as IP addresses, ports, sensor readings, etc. Confluo also requires each record in the stream to have a 8-byte nano-second precision timestamp attribute; if the application does not assign timestamps, Confluo internally assigns one during the write operation.
Schema¶
A schema in Confluo is a collection of strongly-typed attributes. It is specified via JSON like semantics; for instance, consider the example below for a simple schema with three attributes:
{
timestamp: ULONG,
op_latency_ms: DOUBLE,
cpu_util: DOUBLE,
mem_avail: DOUBLE,
log_msg: STRING(100)
}
The first attribute is timestamp with a 8-byte signed integer type; the second, third and fourth attributes correspond to operation latency (in ms), CPU utilization and available memory respectively, all with double precision floating-point type. The final attribute is a log message, with a string type upper bound by 100 characters. Note that each of the attributes must have types associated with them, and each record in a stream with this schema must have its attributes in this order. While Confluo natively supports common primitive types, you can add custom bounded-width data types to Confluo's type system. More details can be found at the Confluo Type-System guide.
Atomic MultiLog¶
Atomic MultiLogs are the basic storage abstraction in Confluo, and are similar in interface to database tables. In order to store data from different streams, applications can create an Atomic MultiLog with a pre-specified schema, and write data streams that conform to the schema to the Atomic MultiLog.
To support queries, applications can add an index for individual attributes in the schema. Confluo also employs a match-action language with three main elements: filter, aggregate and trigger.
- A Confluo filter is an expression comprising of relational and boolean operators (see Table below) over arbitrary subset of bounded-width attributes, and identifies records that match the expression.
- A Confluo aggregate evaluates a computable function on an attribute for all records that match a certain filter expression.
- Finally, a Confluo trigger is a boolean conditional (e.g., <, >, =, etc.) evaluated over a Confluo aggregate.
Relational Operators in Filters:
Operator | Examples |
---|---|
Equality | dst_port=80 |
Range | cpu_util>0.8 |
Boolean Operators in Filters:
Operator | Examples |
---|---|
Conjunction | volt>200 && temp>100 |
Disjunction | cpu_util>0.8 || mem_avail<0.1 |
Negation | transport_protocol != TCP |
Confluo supports indexes, filters, aggregates and triggers only on bounded-width attributes in the schema. Once added, each of these are evaluated and updated upon arrival of each new batch of data records.
A Performance Monitoring and Diagnosis Example¶
We will now see how we can create Atomic MultiLogs, add indexes, filters, aggregate and triggers on them, and finally load some data into them, for both embedded and stand-alone modes of operation. We will work with the example of a performance monitoring and diagnosis tool using Confluo.
Embedded mode¶
In order to use Confluo in the embedded mode, we simply need to include
Confluo's header files under libconfluo/confluo, use the Confluo C++ API in
a C++ application, and compile using a modern C++ compiler. The entry point
header file to include is confluo_store.h
.
Creating a New Confluo Store¶
We will first create a new Confluo Store with the data path for it to use as follows:
confluo::confluo_store store("/path/to/data");
Creating a New Atomic MultiLog¶
We then create a new Atomic MultiLog within the Store (synonymous to a database table); this requires three parameters: a name for the Atomic MultiLog, a fixed schema, and a storage mode:
std::string schema = "{
timestamp: ULONG,
op_latency_ms: DOUBLE,
cpu_util: DOUBLE,
mem_avail: DOUBLE,
log_msg: STRING(100)
}";
auto storage_mode = confluo::storage::IN_MEMORY;
store.create_atomic_multilog("perf_log", schema, storage_mode);
Our Atomic MultiLog adopts the same schema outlined above. The storage mode is set to in-memory, but can be of the following types:
Storage Mode | Description |
---|---|
IN_MEMORY | All data is written purely in memory, and no attempt is made at persisting data to secondary storage. |
DURABLE | Only the raw data (i.e., raw bytes corresponding to each record) is persisted to secondary storage for each write. The write is not considered complete unless its effects have been persisted to secondary storage. |
DURABLE_RELAXED | Only the raw data (i.e., raw bytes corresponding to each record) is persisted to secondary storage; however, the data is buffered in memory and only persisted periodically, instead of persisting data for every write. This generally leads to better write performance. |
We then obtain a reference to our newly created Atomic MultiLog:
confluo::atomic_multilog* mlog = store.get_atomic_multilog("perf_log");
Adding Indexes¶
We can define indexes on the Atomic MultiLog as follows:
mlog->add_index("op_latency_ms");
to add an index on op_latency_ms
attribute.
Adding Filters¶
We can also install filters as follows:
mlog->add_filter("low_resources", "cpu_util>0.8 || mem_avail<0.1");
to explicitly filter out records that indicate low system resources (CPU
utilization > 80%, Available Memory < 10%), using a filter named low_resources
.
Adding Aggregates¶
Additionally, we can add aggregates on filters as follows:
mlog->add_aggregate("max_latency_ms", "low_resources", "MAX(op_latency_ms)");
This adds a new stored aggregate max_latency_ms
on the filter
low_resources
we defined before. In essence, it records the highest
operation latency reported in any record that also indicated low
available resources.
Installing Triggers¶
Finally, we can install a trigger on aggregates as follows:
mlog->install_trigger("high_latency_trigger", "max_latency > 1000");
This installs a trigger high_latency_trigger
on the aggregate
max_latency_ms
, which should generate an alert whenever the
condition max_latency_ms > 1000
is satisfied, i.e.,
whenever the maximum latency for an operation exceeds 1s and
the available resources are low.
Loading sample data into Atomic MultiLog¶
We are now ready to load some data into this Atomic MultiLog. Atomic MutliLogs only support addition of new data via appends. However, new data can be appended in several ways:
Appending String Vectors¶
This version of append
method takes a vector of strings as its input, where
the vector corresponds to a single record. The number of entries in the
vector must match the number of entries in the schema, with the exception
of the timestamp --- if the timestamp is not provided, Confluo will automatically
assign one.
size_t off1 = mlog->append({"100", "0.5", "0.9", "INFO: Launched 1 tasks"});
size_t off2 = mlog->append({"500", "0.9", "0.05", "WARN: Server {2} down"});
size_t off3 = mlog->append({"1001", "0.9", "0.03", "WARN: Server {2, 4, 5} down"});
Also note that the operation returns a unique offset corresponding to each append operation. This forms the "key" for records stored in the Atomic MultiLog -- records can be retrieved by specifying their corresponding offsets.
Appending Raw bytes¶
This version of append
takes as its input a pointer to a C/C++ struct, that maps
exactly to the Atomic MultiLog's schema. For instance, our schema would map to the following
C/C++ struct:
struct perf_log_record {
int64_t timestamp;
double op_latency_ms;
double cpu_util;
double mem_avail;
char log_msg[100];
};
Note that log_msg
maps to a char[100]
rather than an std::string
. To add a new record, we
would populate a struct instance, and pass its reference to the append function:
int64_t ts = utils::time_utils::cur_ns();
perf_log_record rec = { ts, 2000.0, 0.95, 0.01, "WARN: Server {2, 4, 5} down" };
size_t off4 = mlog->append(&rec);
Note that this is a more efficient variant of append, since it avoids the overheads of parsing strings to the corresponding attribute data types.
Batched Appends¶
It is also possible to batch multiple record appends into a single append. The first step in building a batch is to obtain a batch builder:
auto batch_bldr = mlog->get_batch_builder();
The batch builder supports adding new records via both string vector and raw byte interfaces:
batch_bldr.add_record({ "400", "0.85", "0.07", "WARN: Server {2, 4} down"});
perf_log_record rec = { utils::time_utils::cur_ns(), 100.0, 0.65, 0.25, "WARN: Server {2} down" };
batch_bldr.add_record(&rec);
Once the batch is populated, we can append the batch to the Atomic MultiLog as follows:
size_t off5 = mlog->append_batch(batch_bldr.get_batch());
To understand how we can query the data we have loaded so far, read the guide on Confluo Queries.
Stand-alone Mode¶
In the stand-alone mode, Confluo runs as a daemon server, serving client requests using Apache Thrift protocol. To start the server, run:
confluod --address=127.0.0.1 --port=9090
Once the server daemon is running, you can send requests to it using the C++/Python/Java client APIs. Note that the C++ Client API is almost identical to the embedded mode API.
We look at the same performance monitoring and diagnosis tool example for the
stand-alone mode. The relevant header file to include for the C++ Client API is
rpc_client.h
.
Creating a Client Connection¶
To begin with, we first have to establish a client connection with the server.
confluo::rpc::rpc_client client("127.0.0.1", 9090);
from confluo.rpc.client import RpcClient
client = RpcClient("127.0.0.1", 9090)
The first argument to the rpc_client
constructor corresponds to the server
hostname, while the second argument corresponds to the server port.
Creating a New Atomic MultiLog¶
We then create a new Atomic MultiLog within the Store (synonymous to a database table); as before, this requires three parameters: a name for the Atomic MultiLog, a fixed schema, and a storage mode:
std::string schema = "{
timestamp: LONG,
op_latency_ms: DOUBLE,
cpu_util: DOUBLE,
mem_avail: DOUBLE,
log_msg: STRING(100)
}";
auto storage_mode = confluo::storage::IN_MEMORY;
client.create_atomic_multilog("perf_log", schema, storage_mode);
from confluo.rpc.storage import StorageMode
schema = """{
timestamp: ULONG,
op_latency_ms: DOUBLE,
cpu_util: DOUBLE,
mem_avail: DOUBLE,
log_msg: STRING(100)
}"""
storage_mode = StorageMode.IN_MEMORY
client.create_atomic_multilog("perf_log", schema, storage_mode)
This operation also internally sets the current Atomic MultiLog
for the client to the one we just created (i.e., perf_log
). It
is also possible to explicitly set the current Atomic MultiLog for
the client as follows:
client.set_current_atomic_multilog("perf_log");
client.set_current_atomic_multilog("perf_log")
Note
It is necessary to set the current Atomic MultiLog for the rpc_client
.
Issuing requests via the client without setting the current Atomic MultiLog
will result in exceptions.
Adding Indexes¶
We can define indexes as follows:
client.add_index("op_latency_ms");
client.add_index("op_latency_ms")
Adding Filters¶
We can also install filters as follows:
client.add_filter("low_resources", "cpu_util>0.8 || mem_avail<0.1");
client.add_filter("low_resources", "cpu_util>0.8 || mem_avail<0.1")
Adding Aggregates¶
Additionally, we can add aggregates on filters as follows:
client.add_aggregate("max_latency_ms", "low_resources", "MAX(op_latency_ms)");
client.add_aggregate("max_latency_ms", "low_resources", "MAX(op_latency_ms)")
Installing Triggers¶
Finally, we can install a trigger on an aggregate as follows:
client.install_trigger("high_latency_trigger", "max_latency_ms > 1000");
client.install_trigger("high_latency_trigger", "max_latency_ms > 1000")
Loading sample data into Atomic MultiLog¶
We are now ready to load some data into the Atomic MultiLog on the server.
Appending String Vectors¶
size_t off1 = client.append({"100", "0.5", "0.9", "INFO: Launched 1 tasks"});
size_t off2 = client.append({"500", "0.9", "0.05", "WARN: Server {2} down"});
size_t off3 = client.append({"1001", "0.9", "0.03", "WARN: Server {2, 4, 5} down"});
off1 = client.append([100.0, 0.5, 0.9, "INFO: Launched 1 tasks"])
off2 = client.append([500.0, 0.9, 0.05, "WARN: Server {2} down"])
off3 = client.append([1001.0, 0.9, 0.03, "WARN: Server {2, 4, 5} down"])
Batched Appends¶
It is also possible to batch multiple record appends into a single append operation via the client API. This is particularly useful since batching helps amortize the cost of network latency.
The first step in building a batch is to obtain a batch builder:
auto batch_bldr = client.get_batch_builder();
batch_bldr = client.get_batch_builder()
The batch builder supports adding new records via both string vector and raw byte interfaces:
batch_bldr.add_record({ "400", "0.85", "0.07", "WARN: Server {2, 4} down"});
batch_bldr.add_record({ "100", "0.65", "0.25", "WARN: Server {2} down" });
batch_bldr.add_record([ 400.0, 0.85, 0.07, "WARN: Server {2, 4} down" ])
batch_bldr.add_record([ 100.0, 0.65, 0.25, "WARN: Server {2} down" ])
Once the batch is populated, we can append the batch as follows:
size_t off4 = client.append_batch(batch_bldr.get_batch());
off4 = client.append_batch(batch_bldr.get_batch())
Details on querying the data via the client interface can be found in the guide on Confluo Queries.
-
We plan on adding support for variable width data types in a future relase. ↩