A source represents an endpoint for receiving events.
Sources are managed using the Stream API. The full API reference is available at https://stream.keboola.com/v1/documentation/, and the OpenAPI specification is available at https://stream.keboola.com/v1/documentation/openapi3.json.
Events are received via HTTP. Each source can be associated with up to 20 sinks
, which represent mappings
from event data to columns
in a destination table
. Data may be mapped using pre-defined mappings or a custom template
.
Field | Type | Description |
---|---|---|
name |
string | Name of the column. Names must be unique. |
type |
string | The type of the column. Available types and their descriptions are listed below. |
primaryKey |
boolean | Make this column a primary key. Multiple columns may be part of the primary key at the same time. |
The available column types are:
Type | Description |
---|---|
id |
Event ID |
datetime |
Time of the event |
ip |
IP of the event sender |
body |
The unaltered event body |
headers |
The unaltered request headers |
path |
A field from the JSON object |
template |
A custom mapping using a template language |
The path
column type can be used to fetch a single field from a JSON
object. Optionally, you can use rawString
option to remove the quotes around a JSON string or the defaultValue
option to define a value when the field doesn’t exist.
{
"type": "json",
"name": "id",
"path": "issue.id",
"defaultValue": "undefined",
"rawString": true
}
Note: It is recommended to use the faster path
type instead of the jsonnet
function Body(string)
when possible.
The template
column type currently supports the jsonnet
templating language. The following jsonnet
globals are available:
Name | Description | Usage example | Example value |
---|---|---|---|
Ip() |
IP address of the client | Ip() |
127.0.0.1 |
Body() |
Get the entire request body as an object. | Body() |
{ "a": "b" } |
Body(string) |
Get a field from the request body by path. Fails if the field does not exist; in that case, the record will not be saved. | Body("deeply.nested.path") |
1000 |
Body(string, any) |
Get a field from the request body by path, or a default value. | Body("deeply.nested.path", 2000) |
1000 |
BodyStr() |
Get the entire request body as a string. | BodyStr() |
"{\"a\":\"b\"}" |
Header() |
Get all request headers. | Header() |
{ "Content-Type": "application/json" } |
Header(string) |
Get the value of a single request header. Fails if the header does not exist; in that case, the record will not be saved. | Header("Content-Type") |
"application/json" |
Header(string, string) |
Get the value of a single request header or a default value. | Header("Content-Type") |
"application/json" |
HeaderStr() |
Get the request headers as a string, each line containing one “header: value” pair. The lines are sorted alphabetically. | HeaderStr() |
Content-Type: application/json |
Now() |
Get the current UTC datetime as a string formatted using the default format. | Now() |
"2023-01-14T08:04:05.123Z" |
Now(string) |
Get the current UTC datetime as a string with the custom strftime -compatible format. |
Now("%Y-%m-%d") |
2023-01-14 |
Incoming events are mapped to the schema defined in each sink, and each new row is appended to a CSV file on the local hard disk (local storage).
When the local storage accumulates enough records or a short time passes, the records from local storage are appended to a CSV file stored in your Keboola project (staging storage).
Once certain conditions are met, the data from the file is imported into the destination table (target storage). These conditions
are defined by the sink:
Condition | Minimum | Maximum | Default |
---|---|---|---|
time |
30 seconds | 24 hours | 1 minute |
size |
100 B | 500 MB | 50 MB |
count |
1 | 10 million | 50 thousand |
Changing these conditions will trigger an immediate import of waiting files, after which the stream will follow the updated conditions.
Sources can be created using the POST /v1/branches/{branchId}/sources
endpoint.
If a source or sink id
is omitted, it will be generated from the corresponding name
field.
A source may be created without any sinks. The sinks can then be created separately using the POST /v1/branches/{branchId}/sources/{sourceId}/sinks
endpoint.
Warning: Events sent to a source without any sinks will be permanently lost. This is because data is buffered per sink, not per source.
The requests are asynchronous and create a task that must be completed before the source or sink is ready to use. The task status can be checked using the GET /v1/branches/{branchId}/sources/{sourceId}/tasks/{taskId}
endpoint.
Sink tables are created if they do not exist. If they already exist, the schema defined by sink.columns
must match the existing schema. If the table schema is manually altered and it no longer matches, the import from staging storage to the table will fail. The data is kept in the staging storage for up to 7 days during which you can recover any failures.
Sources may be deleted using the DELETE /v1/branches/{branchId}/sources/{sourceId}
endpoint. Sinks may be deleted using the DELETE /v1/branches/{branchId}/sources/{sourceId}/sinks/{sinkId}
endpoint.
A source may be updated using the PATCH /v1/branches/{branchId}/sources/{sourceId}
endpoint. Sinks maybe updated using the PATCH /v1/branches/{branchId}/sources/{sourceId}/sinks/{sinkId}
endpoint.
The UpdateSource
endpoint may only update the source’s name. Sinks may only be updated separately.
If a sink’s mapping.tableId
is updated, it is handled the same way as in the create operation. If the table exists, mapping.columns
must match the existing table’s schema. If the table does not exist, it is created.
The import conditions mentioned above can be accessed using the GET /v1/branches/{branchId}/sources/{sourceId}/settings
endpoint and changed using the PATCH /v1/branches/{branchId}/sources/{sourceId}/settings
endpoint.
Same settings also exist for a sink. Use the GET /v1/branches/{branchId}/sources/{sourceId}/sinks/{sinkId}/settings
endpoint and the PATCH /v1/branches/{branchId}/sources/{sourceId}/sinks/{sinkId}/settings
endpoint in that case.
Depending on your use case, you may require different delivery guarantees for your stream. Follow the guidelines below to ensure the desired outcome.
To ensure that no record is delivered twice, make sure the client doesn’t retry when sending the records. In this case, it’s beneficial to use the setting endpoints to set "storage.level.local.encoding.sync.wait"
to false
to increase throughput.
To ensure that every record is delivered at least once, the client needs to implement retries when sending the records. Also, use the setting endpoints to confirm that "storage.level.local.encoding.sync.wait"
is set to true
(default behavior). Note that this setting guarantees that the record is written to the local disk.
A token is generated for each source sink. These tokens have the minimum possible scope with write
permission for the bucket in which the destination table is
stored. You can view these tokens at https://connection.keboola.com/admin/projects/<project-id>/tokens-settings
. Their description follows the format
[_internal] Stream Sink <source-id>/<sink-id>
.
These tokens should not be deleted or refreshed manually. To refresh a token, you can disable and re-enable the sink.
To connect Keboola with Apache Kafka® and ingest data from Kafka topics via data streams, use the Kafka Connect HTTP Sink Connector to establish a communication channel between Kafka and Keboola.
The Kafka Connect HTTP Sink Connector acts as a bridge, seamlessly integrating Kafka with Keboola’s Data Stream HTTP API. Here’s a breakdown of the process:
Key Points to Remember: