sdk-python
January 19, 2024 ยท View on GitHub
Python SDK for CDEvents
The SDK can be used to create CDEvents in CloudEvents form, for use with the CloudEvents Python SDK.
Disclaimer ๐ง
This SDK is work in work in progress, it will be maintained in sync with the specification and it now covers all events from the specification.
Get started
Clone the repo and install the Python SDK as a package:
cd src && pip install .
And import the module in your code
import cdevents
Create your first CDEvent
To create a CDEvent, for instance a pipelineRun queued one:
import cdevents
event = cdevents.new_pipelinerun_queued_event(
context_id="my-context-id",
context_source="my/first/cdevent/program",
context_timestamp=datetime.datetime.now(),
subject_id="myPipelineRun1",
custom_data={"hello_message": "hi!"},
subject_source="subjectSource",
custom_data_content_type="application/json",
pipeline_name="myPipeline",
url="https://example.com/myPipeline",
)
Send your first CDEvent as CloudEvent
To send a CDEvent as CloudEvent:
import cdevents
# Create a CloudEvent from the CDEvent
cloudevent = cdevents.to_cloudevent(event)
# Creates the HTTP request representation of the CloudEvent in structured content mode
headers, body = to_structured(event)
# POST
requests.post("<some-url>", data=body, headers=headers)
See the CloudEvents docs as well.